package com.huan.hadoop.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.net.URI;

/**
 * 现在有2个文件，订单文件和商品文件，其中 订单文件中保留着 商品id， 而商品id对应的中文名在 商品文件 中，现在
 * 需要将 订单文件中的 商品id 显示成 中文
 *
 * @author huan.fu
 * @date 2023/7/16 - 15:17
 */
public class MapJoinDriver extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        // 构建配置对象
        Configuration configuration = new Configuration();
        // 使用 ToolRunner 提交程序
        int status = ToolRunner.run(configuration, new MapJoinDriver(), args);
        // 退出程序
        System.exit(status);
    }

    @Override
    public int run(String[] args) throws Exception {
        // 构建Job对象实例 参数（配置对象，Job对象名称）
        Job job = Job.getInstance(getConf(), MapJoinDriver.class.getName());
        // 设置mr程序运行的主类
        job.setJarByClass(MapJoinDriver.class);
        // 设置mr程序运行的 mapper类型和reduce类型
        job.setMapperClass(MapJoinMapper.class);
        // 指定mapper阶段输出的kv数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 指定reduce阶段输出的kv数据类型，业务mr程序输出的最终类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        // 配置本例子中的输入数据路径和输出数据路径，默认输入输出组件为： TextInputFormat和TextOutputFormat 如果是集群运行,需要设置HDFS路径
        FileInputFormat.setInputPaths(job, new Path("/Users/huan/code/IdeaProjects/me/spring-cloud-parent/hadoop/mr-map-join/order.txt"));
        FileOutputFormat.setOutputPath(job, new Path("/Users/huan/code/IdeaProjects/me/spring-cloud-parent/hadoop/mr-map-join/output"));

        // 加载缓存数据(如果是集群运行,需要设置HDFS路径)
        job.addCacheFile(URI.create("/Users/huan/code/IdeaProjects/me/spring-cloud-parent/hadoop/mr-map-join/product.txt"));
        // Map端Join的逻辑不需要Reduce阶段，设置reduceTask数量为0
        job.setNumReduceTasks(0);

        return job.waitForCompletion(true) ? 0 : 1;
    }
}
